#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <time.h>
#include <sys/time.h>

#include "definition.h"
#include "epoll_server.h"

#define SERV_IP "127.0.0.1"
#define SERV_PORT 6339
#define MAXNEVENTS 200
#define NUM_THREAD 8
#define LOCKTIMEOUT    5000

void stopmain(int signal);
void skippipeerror(int signal);
void writeoutrecords();
void refreshrecords(int serv_port);

int server_fd, efd;
LeonRing leon;
pthread_mutex_t g_Mutex;
int do_flag[NUM_THREAD];
struct epoll_event tmp_event;
int num_conn;			// count of connections
extern int getpidval;
extern uint16_t lenmsgpool[10000];
double packet_process_speed[10000];
int pktprcsed_num = 0;
extern uint32_t num_packet_processed;
int pktprcsed_i = 0;
extern int lenmsgi;

char rps_outfile[60] = "rps_port.txt";	//
char len_outfile[60] = "len_port.txt";
int filenameid = 0;		// number for filename
char *fileprefix = "./";

typedef struct event_queue_item EQ_ITEM;
struct event_queue_item {
	struct epoll_event event;
	EQ_ITEM *next;
};

typedef struct events_queue EQ;
struct events_queue {
	EQ_ITEM *head;
	EQ_ITEM *tail;
	pthread_mutex_t lock;
};

typedef struct {
	pthread_t thread_id;	/* unique ID of this thread */
	int notify_receive_fd;	/* receiving end of notify pipe */
	int notify_send_fd;	/* sending end of notify pipe */
	struct events_queue *new_events_queue;
	pthread_mutex_t lock;
	pthread_cond_t cond;
} WORKER_THREAD;

typedef signed int INT32;

INT32 Lock();
INT32 UnLock();
static WORKER_THREAD *threads;
void *runthread(void *args);
static void eq_push(EQ * eq, EQ_ITEM * event);
void create_thread();
static void setup_thread(WORKER_THREAD * me);
static void create_worker(void *(*func) (void *), void *arg);
static void eq_init(EQ * eq);
static void *worker_libevent(void *arg);
void dispatch_conn_new(struct epoll_event event);
static EQ_ITEM *eq_pop(EQ * eq);

void create_thread()
{
	int i = 0;
	threads = calloc(NUM_THREAD, sizeof(WORKER_THREAD));
	if (!threads) {
		perror("Can't allocate thread descriptors");
		exit(1);
	}
	for (i = 0; i < NUM_THREAD; i++) {
		int fds[2];
		if (pipe(fds)) {
			perror("Can't create notify pipe");
			exit(1);
		}

		threads[i].notify_receive_fd = fds[0];
		threads[i].notify_send_fd = fds[1];

		setup_thread(&threads[i]);
	}
	/* Create threads  */
	for (i = 0; i < NUM_THREAD; i++) {
		create_worker(worker_libevent, &threads[i]);
	}
}

static void setup_thread(WORKER_THREAD * me)
{

	me->new_events_queue = malloc(sizeof(struct events_queue));
	if (me->new_events_queue == NULL) {
		perror("Failed to allocate memory for event queue");
		exit(EXIT_FAILURE);
	}
	eq_init(me->new_events_queue);

	if (pthread_mutex_init(&me->lock, NULL)) {
		printf("Failed to create stack singal\n");
		exit(-1);
	}
	if (pthread_cond_init(&me->cond, NULL)) {
		printf("Failed to create log thread\n");
		exit(-1);
	}

}

static void create_worker(void *(*func) (void *), void *arg)
{
	pthread_attr_t attr;
	int ret;

	pthread_attr_init(&attr);

	if ((ret = pthread_create(&((WORKER_THREAD *) arg)->thread_id, &attr, func, arg)) != 0) {
		fprintf(stderr, "Can't create thread: %s\n", strerror(ret));
		exit(1);
	}

}

static void eq_init(EQ * eq)
{
	pthread_mutex_init(&eq->lock, NULL);
	eq->head = NULL;
	eq->tail = NULL;
}

static void *worker_libevent(void *arg)
{
	WORKER_THREAD *me = arg;
	/* Any per-thread setup can happen here; memcached_thread_init() will block until
	 * all threads have finished initializing.
	 */
	char buf[1];

	struct epoll_event event, tmp_event;
	uint16_t serv_port = SERV_PORT;	// port
	int tempfd;
	while (1) {/*
		pthread_mutex_lock(&me->lock);
		pthread_cond_wait(&me->cond, &me->lock);
		pthread_mutex_unlock(&me->lock);
		*/
		if (read(me->notify_receive_fd, buf, 1) != 1)
			fprintf(stderr, "Can't read from libevent pipe\n");
		//`printf("child thread %u, pipe read: %c\n", me->thread_id, buf[0]);
		switch (buf[0]) {
		case 'c':
			while (me->new_events_queue->tail != NULL) {

				EQ_ITEM *item = eq_pop(me->new_events_queue);
				//printf("child thread %u, event.fd: %d\n", pthread_self(), event.data.fd);

				if (NULL != item) {
					event = item->event;

					if (event.events & EPOLLIN) {	// data in
						//printf("EPOLLIN -----------\n");
						tempfd = event.data.fd;
						if (tempfd < 0) {
							printf("Why sockfd is 0??????????????????????????????\n");
							continue;
						}

						if (recvLeonRing(&leon, tempfd)
						    == -2) {
							epoll_ctl(efd, EPOLL_CTL_DEL, tempfd, &tmp_event);
							printf("%d: [recv]close client: %d. \n", getpidval, tempfd);
							Lock();
							num_conn--;
							UnLock();

							if (num_conn == 0) {
								//printf("in recv write %s %s ......\n", rps_outfile, len_outfile);
								writeoutrecords();
								refreshrecords(serv_port);
							}
						} else if (processLeonRing(&leon, tempfd) < 0) {
							epoll_ctl(efd, EPOLL_CTL_DEL, tempfd, &tmp_event);
							printf("%d: [send]close client: %d. \n", getpidval, tempfd);
							Lock();
							num_conn--;
							UnLock();
							if (num_conn == 0) {
								//printf("in send write %s %s ......\n", rps_outfile, len_outfile);
								writeoutrecords();
								refreshrecords(serv_port);
							}
						} else if (lenRingSend(leon.sockfd[tempfd]) > 0) {
							tmp_event.events = EPOLLOUT | EPOLLET;
							tmp_event.data.fd = tempfd;
							epoll_ctl(efd, EPOLL_CTL_MOD, tempfd, &tmp_event);
						}
					}

					else if (event.events & EPOLLOUT) {	// data in
						//printf("EPOLLIN -----------\n");
						tempfd = event.data.fd;
						if (tempfd < 0) {
							printf("Why sockfd is 0??????????????????????????????\n");
							continue;
						}

						int i = processLeonRing(&leon,
									tempfd);
						if (i < 0) {	// return -1 -2
							epoll_ctl(efd, EPOLL_CTL_DEL, tempfd, &tmp_event);
							//printf("Sockfd closed when send......\n");
							printf("%d: [send]close client: %d. \n", getpidval, tempfd);
							Lock();
							num_conn--;
							if (num_conn == 0) {
								UnLock();
								//printf("in send write %s %s ......\n", rps_outfile, len_outfile);
								writeoutrecords();
								refreshrecords(serv_port);
							}
						}
						if (lenRingSend(leon.sockfd[tempfd]) == 0) {
							tmp_event.events = EPOLLIN | EPOLLET;
							tmp_event.data.fd = tempfd;
							epoll_ctl(efd, EPOLL_CTL_MOD, tempfd, &tmp_event);
						}
					}
				}
			}

			break;

		}
	}
	return NULL;
}

INT32 Lock()
{
	struct timeval tCurrentTime;
	struct timespec tTimeout;

	INT32 iRetCode = 0;

	gettimeofday(&tCurrentTime, NULL);
	tTimeout.tv_sec = tCurrentTime.tv_sec + LOCKTIMEOUT / 1000;
	tTimeout.tv_nsec = tCurrentTime.tv_usec * 1000;

	iRetCode = pthread_mutex_timedlock(&g_Mutex, &tTimeout);
	if (iRetCode != 0) {
		printf("MutexLock: exec pthread_mutex_timedlock failed, RetCode=%d\n", iRetCode);
		return -1;
	}

	return 0;
}

INT32 UnLock()
{
	INT32 iRetCode = 0;

	iRetCode = pthread_mutex_unlock(&g_Mutex);
	if (iRetCode != 0) {
		printf("MutexUnLock: exec pthread_mutex_unlock failed, RetCode=%d\n", iRetCode);
		return -1;
	}

	return 0;
}

void dispatch_conn_new(struct epoll_event event)
{

	EQ_ITEM *item = malloc(sizeof(EQ_ITEM));
	WORKER_THREAD *thread;
	item->event = event;
	item->next = NULL;

	int tid = event.data.fd % NUM_THREAD;

	do_flag[tid] = 1;
	thread = threads+tid;
	eq_push(thread->new_events_queue, item);
	//printf("Dispath event.fd %d to thread %d\n", event.data.fd, thread->thread_id);
}

static void eq_push(EQ * eq, EQ_ITEM * item)
{

	pthread_mutex_lock(&eq->lock);
	if (NULL == eq->tail)
		eq->head = item;
	else
		eq->tail->next = item;
	eq->tail = item;
	pthread_mutex_unlock(&eq->lock);
	// printf("entry2\n");
}

static EQ_ITEM *eq_pop(EQ * eq)
{

	EQ_ITEM *event;
	pthread_mutex_lock(&eq->lock);
	event = eq->head;
	// printf("GGG\n");
	if (NULL != event) {
		eq->head = event->next;
		if (NULL == eq->head)
			eq->tail = NULL;
	}
	pthread_mutex_unlock(&eq->lock);
//      printf("HHH\n");
	return event;
}

int main(int argc, char *argv[])
{				// ./run [PORT] [rps filename] [len filename]
	if (argc < 3)
		printf("Usage: ./run [PORT] [file path]\n");
	struct sockaddr_in server_addr, client_addr;
	socklen_t len_cli = sizeof(struct sockaddr);
	char buf[1];
	struct epoll_event events[MAXNEVENTS];
	pthread_t pth;
	int i = 0;
	int tempfd;
	pthread_create(&pth, NULL, runthread, NULL);	// just for count

	signal(SIGINT, stopmain);
	struct sigaction action;
	action.sa_handler = skippipeerror;
	sigemptyset(&action.sa_mask);
	action.sa_flags = 0;
	sigaction(SIGPIPE, &action, NULL);
	uint16_t serv_port = SERV_PORT;	// port
	getpidval = getpid();

	if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
		printf("%d: socket failed ---------\n", getpidval);
		exit(-1);
	}

	fcntl(server_fd, F_SETFL, fcntl(server_fd, F_GETFL, 0) | O_NONBLOCK);

	if (argc >= 2)
		serv_port = atoi(argv[1]);

	if (argc >= 3)
		fileprefix = argv[2];	// dir

	//if (argc >= 3) rps_outfile = argv[2]; // rps file name

	//if (argc >= 4) len_outfile = argv[3]; // len file name

	sprintf(rps_outfile, "%srps_%d_%d.txt", fileprefix, serv_port, filenameid);
	sprintf(len_outfile, "%slen_%d_%d.txt", fileprefix, serv_port, filenameid);

	server_addr.sin_family = AF_INET;
	server_addr.sin_port = htons(serv_port);
	server_addr.sin_addr.s_addr = INADDR_ANY;

	if (bind(server_fd, (struct sockaddr *)&server_addr, len_cli) == -1) {
		printf("%d: bind failed -----------\n", getpidval);
		exit(-1);
	}

	if (listen(server_fd, 256) == -1) {
		printf("%d: listen failed ---------\n", getpidval);
		exit(-1);
	}

	if ((efd = epoll_create(MAXNEVENTS)) == -1) {
		printf("%d: epoll_create failed ---\n", getpidval);
		exit(-1);
	}

	tmp_event.events = EPOLLIN;
	tmp_event.data.fd = server_fd;

	epoll_ctl(efd, EPOLL_CTL_ADD, server_fd, &tmp_event);

	int num_epwait, tmpi;

	initLeonRing(&leon);	// init buffer

	//printf("before while-loop, and all is done......\n");
	num_conn = 0;
	create_thread();
	while (1) {
		for (i = 0; i < NUM_THREAD; i++) {
			do_flag[i] = 0;
		}
		num_epwait = epoll_wait(efd, events, MAXNEVENTS, -1);	//the number of events;

		//printf("#### in while loop [%d]\n", num_epwait);

		for (tmpi = 0; tmpi < num_epwait; tmpi++) {	// for every new event

			if (events[tmpi].data.fd == server_fd) {
				//printf("enter server loop %d\n",tmpi);
				while ((tempfd = accept(server_fd, (struct sockaddr *)&client_addr, &len_cli)) > 0) {
					printf("%d: new clientssss: %d. \n", getpidval, tempfd);	// whether sockfd right?
					fcntl(tempfd, F_SETFL, fcntl(tempfd, F_GETFL, 0) | O_NONBLOCK);
					tmp_event.events = EPOLLIN | EPOLLET;
					tmp_event.data.fd = tempfd;
					epoll_ctl(efd, EPOLL_CTL_ADD, tempfd, &tmp_event);

					if (num_conn == 0) {	// clear files
						fopen(rps_outfile, "w");
						fopen(len_outfile, "w");
					}
					num_conn++;	// connections count ++
				}
			} else {
				//printf("loop %d\n",tmpi);
				dispatch_conn_new(events[tmpi]);
			}
		}		// finished for-loop
		//printf("@@@@ end of while-loop................\n");
		//if (--loop < 0) break;
		for (i = 0; i < NUM_THREAD; i++) {
			if (do_flag[i] == 1) {
				/*
				pthread_mutex_lock(&threads[i].lock);
				pthread_cond_signal(&threads[i].cond);
				pthread_mutex_unlock(&threads[i].lock);*/

				buf[0] = 'c';
				if (write(threads[i].notify_send_fd, buf, 1) != 1) {
					perror("Writing to thread notify pipe");
				}
			}
		}
	}			// finished while-loop

	close(server_fd);
	close(efd);
	return 0;
}

void stopmain(int signal)
{
	printf("**** Ctrl+C ****\n");
	close(server_fd);
	close(efd);
	freeLeonRing(&leon);

	writeoutrecords();	// write records

	printf("**** End Ctrl+C \n");
	_exit(0);
}

void skippipeerror(int signal)
{
	printf("**** SIGPIPE ****\n");
}

void writeoutrecords()
{
	//*
	FILE *fil = 0;
	FILE *fil1 = 0;
	fil = fopen(len_outfile, "a+");
	if (!fil) {
		printf("Cannot open %s\n", len_outfile);
		goto close;
	}
	int tmpi = 0;
	for (; tmpi < 10000; tmpi++) {
		fprintf(fil, "%u\n", lenmsgpool[tmpi]);
	}
	//printf("\n");//*/

	fil1 = fopen(rps_outfile, "a+");
	if (!fil) {
		printf("Cannot open %s\n", rps_outfile);
		goto close;
	}
	//int tmpi = 0;

	for (tmpi = 0; tmpi < (pktprcsed_num > 10000 ? 10000 : pktprcsed_num); tmpi++) {
		fprintf(fil1, "%lf\n", packet_process_speed[tmpi]);
	}
	fprintf(fil1, "total prcsed: %lu\n", num_packet_processed);

      close:
	if (fil)
		fclose(fil1);
	if (fil1)
		fclose(fil);
}

void refreshrecords(int serv_port)
{
	filenameid++;
	sprintf(rps_outfile, "%srps_%d_%d.txt", fileprefix, serv_port, filenameid);
	sprintf(len_outfile, "%slen_%d_%d.txt", fileprefix, serv_port, filenameid);
	pktprcsed_i = 0;
	num_packet_processed = 0;
	pktprcsed_num = 0;
	lenmsgi = 0;
	memset(lenmsgpool, 0, 10000 * sizeof(uint16_t));	// init lenmsgpool
}

void *runthread(void *args)
{
	packet_process_speed[0] = 0;
	int preprcsed = num_packet_processed;
	int curprcsed;
	//struct timespec pretime;
	//clock_gettime(CLOCK_MONOTONIC, &pretime);
	//struct timespec curtime;
	struct timeval pretime;
	struct timeval curtime;
	gettimeofday(&pretime, NULL);
	//FILE *ouc = fopen("ttt.txt", "w");
	//int getpidval = getpid();
	while (1) {
		sleep(1);	// wait for 1 second
		curprcsed = num_packet_processed;	// must!!!!!!!
		//curtime = clock();
		//clock_gettime(CLOCK_MONOTONIC, &curtime);
		gettimeofday(&curtime, NULL);
		if (curprcsed == 0)
			preprcsed = 0;
		if (curprcsed > preprcsed) {
			//packet_process_speed[pktprcsed_i] = (double)(curprcsed - preprcsed);
			int timetime = 1000000 * (curtime.tv_sec - pretime.tv_sec) + curtime.tv_usec - pretime.tv_usec;
			packet_process_speed[pktprcsed_i] = (double)(curprcsed - preprcsed)
			    //* 1000000.0 / (curtime.tv_usec - pretime.tv_usec);
			    * 1000000 / timetime;
			printf("%d: rps: %d, time: %d\n", getpidval, curprcsed - preprcsed, timetime);
			preprcsed = curprcsed;
			pktprcsed_i = (pktprcsed_i + 1) % 10000;
			pktprcsed_num++;
		}
		pretime = curtime;	//
	}
}
